package com.guaoran.distributed.io.nio.two.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

/**
 * @Author gucheng
 * @Description 客户端
 * 2019-04-29 17:07
 */
public class NIOClient {
    private SocketChannel socketChannel;
    private Selector selector;
    private final int port;
    private final List<String> list = new ArrayList<>();



    private CountDownLatch countDownLatch = new CountDownLatch(1);


    public NIOClient(int port) throws IOException, InterruptedException {
        this.port = port;
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        socketChannel.connect(new InetSocketAddress(this.port));

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    handleKeys();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        if (countDownLatch.getCount() != 0) {
            countDownLatch.await();
        }
        System.out.println("Client 启动完成");
    }

    private void handleKeys() throws IOException {
        while (true){
            int selectNum = selector.select(30 * 1000L);
            if(selectNum == 0){
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                // 忽略无效的 SelectionKey
                if(!key.isValid()){
                    continue;
                }
                processHandle(key);
            }

        }
    }

    private synchronized void processHandle(SelectionKey key) throws IOException {

        // 连接就绪
        if(key.isConnectable()){
            connectableHandle(key);
        }
        // 读就绪
        if(key.isReadable()){
            readableHandle(key);
        }
        // 写就绪
        if(key.isWritable()){
            writableHandle(key);
        }
    }

    /**
     * 写
     * @param key
     */
    private void writableHandle(SelectionKey key) throws ClosedChannelException {
        SocketChannel channel = (SocketChannel) key.channel();
        // 遍历响应队列
        List<String> arrayList = (ArrayList<String>)key.attachment();
        arrayList.forEach(content->{
            System.out.println("写入数据："+content);
            CodecUtil.write(channel,content);
        });
        arrayList.clear();
        // 注册 client socket channel 到 selector 上
        socketChannel.register(selector,SelectionKey.OP_READ,arrayList);
    }

    /**
     * 读
     * @param key
     * @throws ClosedChannelException
     */
    private void readableHandle(SelectionKey key) throws ClosedChannelException {
        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer byteBuffer = CodecUtil.read(channel);
        if(byteBuffer.position()>0){
            String content = CodecUtil.newStr(byteBuffer);
            System.out.println("读取数据："+content);
        }
    }

    /**
     * 连接
     * @param key
     * @throws IOException
     */
    private void connectableHandle(SelectionKey key) throws IOException {
        // 完成连接
        if(!socketChannel.isConnectionPending()){
            return;
        }
        socketChannel.finishConnect();

        System.out.println("接受新的 channel");

        // 注册 client socket channel 到 selector
        socketChannel.register(selector,SelectionKey.OP_READ,list);
        // 标记为已连接
        countDownLatch.countDown();
    }

    public synchronized void send(String content) throws ClosedChannelException {
        list.add(content);
        System.out.println("send...写入数据："+content);
        socketChannel.register(selector,SelectionKey.OP_WRITE,list);
        selector.wakeup();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        NIOClient client = new NIOClient(8081);
        for (int i = 0; i < 30; i++) {
            client.send("Hello" + i);
            Thread.sleep(1000L);
        }
    }
}
